#!/usr/bin/env python
#
# Copyright (C) 2014-2016 DNAnexus, Inc.
#
# This file is part of dx-toolkit (DNAnexus platform client libraries).
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
#   use this file except in compliance with the License. You may obtain a copy
#   of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#   License for the specific language governing permissions and limitations
#   under the License.

''' This helper script uploads all output files from the execution environment
to the job's workspace.

Overview
   Upload everything that is in the output directory, and generate a
   $HOME/job_output.json file that describes it.

   (a) Figure out what exists in the output directory, and is relevant.
      The relevant formats are:
      <odir>/xxx/yyy
          xxx == key
          yyy == file name
   (b) If there is an output spec, compare against it.
   (c) Upload everything that is in the output directory
   (d) Generate a $HOME/job_output.json file that describes it.
'''
import concurrent.futures
import os
import stat
import sys
import json
import argparse
import dxpy
from dxpy.utils import file_load_utils
from dxpy.utils.printing import fill, refill_paragraphs, BOLD, RED

description = BOLD('Note') + ''': this is a utility for use by bash apps
running in the DNAnexus Platform.

Uploads all files and subdirectories in the directory $HOME/out, as
described below. It also adds relevant entries into the job_output.json
file.

By convention, only directories with names equal to output parameter
names are expected to be found in the output directory, and any file(s)
found in those subdirectories will be uploaded as the corresponding
outputs.  For example, a file with the path

    $HOME/out/FOO/OUTPUT.TXT

will be uploaded, and the key "FOO" will be added to the job_output.json
file with value

    {"$dnanexus_link": "file-xxxx"}

where "file-xxxx" is the ID of the newly uploaded file. If multiple
files are found, they will be added as an array output (in unspecified
order). If subdirectories are found under $HOME/out/FOO, then they are
uploaded in their entirety to the workspace, and values are added
to FOO in the job_output.json file. For example, the path:

    $HOME/out/FOO/BAR/XXX.TXT

will be uploaded to /BAR/XXX.TXT.
'''

parser = argparse.ArgumentParser(description=refill_paragraphs(description),
                                 formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('--except',
                    help=fill('Do not upload the input with this name. (May be used multiple times.)',
                              width_adjustment=-20),
                    action="append",
                    dest="exclude",
                    default=[])

parser.set_defaults(parallel=False)
parser.add_argument("--parallel", help="Upload the files in parallel", action="store_true",
                    dest="parallel")
parser.add_argument("--sequential", help="Upload the files sequentially", action="store_false",
                    dest="parallel")
parser.add_argument('--clearJSON',
                    help='Clears the output JSON file prior to starting upload.')
parser.add_argument("--wait-on-close", help="Wait for files to close, default is not to wait",
                    action="store_true",
                    default=False,
                    dest="wait_on_close")
args = parser.parse_args()
parser.parse_args()
max_num_parallel_uploads = 8

def report_error_and_exit(err_type, message):
    ''' Report an error, since this is called from a bash script, we
        can't simply raise an exception. Instead, we write the error to
        a standard JSON file.

        TODO: refactor, shared code with dx-jobutil-report-error
    '''
    error_hash = {
        "error": {
            "type": err_type,
            "message": message
        }
    }
    sys.stderr.write("{}: {}\n".format(err_type, message))
    with open(os.path.expanduser(os.path.join('~', 'job_error.json')), 'w') as error_file:
        error_file.write(json.dumps(error_hash, indent=4) + '\n')
    sys.exit(1)


def get_output_spec():
    ''' Extract the outputSpec, if it exists
    '''
    output_spec = None
    if 'DX_JOB_ID' in os.environ:
        # works in the cloud, not locally
        job_desc = dxpy.describe(dxpy.JOB_ID)
        if job_desc["function"] == "main":
            # The output spec does not apply for subjobs
            desc = dxpy.describe(job_desc.get("app", job_desc.get("applet")))
            if "outputSpec" in desc:
                output_spec = desc["outputSpec"]
    elif 'DX_TEST_DXAPP_JSON' in os.environ:
        # works only locally
        path_to_dxapp_json = os.environ['DX_TEST_DXAPP_JSON']
        with open(path_to_dxapp_json, 'r') as fd:
            dxapp_json = json.load(fd)
            output_spec = dxapp_json.get('outputSpec')

    # convert to a dictionary. Each entry in the output spec
    # has {name, class} attributes.
    if output_spec == None:
        return None

    # for each field name, we want to know its class, and if it
    # is optional
    subdir_recs = {}
    for spec in output_spec:
        name = spec['name']
        subdir_recs[name] = {'class': spec['class']}
        if 'optional' in spec:
            subdir_recs[name]['optional'] = spec['optional']
        else:
            subdir_recs[name]['optional'] = False
    return subdir_recs


def listdir_only_files_and_subdirs(path):
    ''' Traverses *path* and returns a pair of lists, one for regular files,
    the other for regular subdirectories. Symbolic links and hidden files/dirs
    are ignored. '''
    files = []
    subdirs = []
    for fname in os.listdir(path):
        fname_fullpath = os.path.join(path, fname)
        if (fname.startswith('.') or
            os.path.islink(fname_fullpath)):
            # explicitly ignore hidden files, directories, and symbolic links
            continue
        elif os.path.isdir(fname_fullpath):
            subdirs.append(fname)
        elif os.path.isfile(fname_fullpath) or stat.S_ISFIFO(os.stat(fname_fullpath).st_mode):
            files.append(fname)
        else:
            # ignore anything else
            pass
    return files, subdirs

def deep_list(base_path, rel_path):
    '''
    :param base_path: path in the local filesystem to a directory
    :type base_path: string
    :param rel_path: relative path to *base_path*
    :type rel_path: string
    :returns: A list of file paths residing in *base_path*/*rel_path*. The results
    are relative to *base_path*.
    :rtype: string list

    Recursively traverse local directory *base_path*/*rel_path*, and make a list of all the files.
    Files are returned represented as paths relative to *base_path*.
    '''
    result = []
    files, subdirs = listdir_only_files_and_subdirs(os.path.join(base_path, rel_path))
    for filename in files:
        result.append(os.path.join(rel_path, filename))
    for dirname in subdirs:
        result.extend(deep_list(base_path, os.path.join(rel_path, dirname)))
    return result

def get_output_subdir_info():
    '''
    Figure out what the output directory looks like.
    We are interested in elements of the form:
    <odir>/xxx/yyy
         xxx == key
         yyy == file name

     Arrays look like this:
       <odir>/xxx/yyy
       <odir>/xxx/vvv
       <odir>/xxx/zzz

    return dictionary that maps subdir name (key) to a tuple. Each
    tuple includes the subdir path, and a list of all the files.
    '''
    odir = file_load_utils.get_output_dir()
    if not os.path.isdir(odir):
        return {}
    _, subdir_list = listdir_only_files_and_subdirs(odir)
    subdir_recs = {}  # mapping from name to attributes
    for subdir in subdir_list:
        path = os.path.join(odir, subdir)
        if not os.path.isdir(path):
            continue
        files, child_dirs = listdir_only_files_and_subdirs(os.path.join(odir, subdir))
        child_dfiles = []  # list of files inside each child directory
        tot_num_files = len(files)
        for ddir in child_dirs:
            dfiles = deep_list(path, ddir)
            child_dfiles.append(dfiles)
            tot_num_files += len(dfiles)
        s_rec = {'path': path,
                 'files': files,
                 'child_dirs': child_dirs,
                 'child_dfiles': child_dfiles,
                 'dx_links': [],
                 'tot_num_files': tot_num_files}
        subdir_recs[subdir] = s_rec
    return subdir_recs


def compare_to_output_spec_and_annotate(subdir_recs, output_spec):
    '''Compare the subdirectories found in the output directory to the
    output specification, and adds annotations to the directory
    descriptions.

    An output spec is a list of entries with {class, name} attributes.
    '''
    def sanity_check_field(key, is_optional):
        ''' Annotate with a class, and sanity check '''
        class_ = output_spec[key]['class']
        s_rec = subdir_recs[key]
        s_rec['class'] = class_
        num_files = s_rec['tot_num_files']
        if class_ == 'file':
            if num_files > 1:
                report_error_and_exit(
                    "AppInternalError",
                    "Expected to find 1 file for key {} of class file, but there are {} files".format(key, num_files))
            if num_files == 0 and not is_optional:
                # This is an error that the jobmanager will deal with.
                pass
        else:
            # array of files, there could be any number of files in the
            # subdirectory (including zero)
            pass

    for key in output_spec:
        '''Check that all the relevant output fields have been generated'''
        class_ = output_spec[key]['class']
        if class_ == 'file' or class_ == 'array:file':
            is_optional = output_spec[key]['optional']
            if key in subdir_recs:
                sanity_check_field(key, is_optional)
            # Note: the jobmanager will throw an error for missing outputs
        else:
            # detects when a subdirectory appears for an output that
            # is not "file" nor "array:file".
            if key in subdir_recs:
                report_error_and_exit(
                    "AppInternalError",
                    "key {} is of class {} but it appears in the output directory".
                    format(key, class_))

# Create an entry encapsulating the context required to upload a file.
# The idea is to avoid sharing data between parallel threads of execution.
def create_entry(subdir_desc, local_path, target_path, basename):
    return {'subdir_desc' : subdir_desc,
            'local_dir_path': local_path,
            'target_dir_path': target_path,
            'fname': basename,
            'dxlink': None}

def upload_one_file(entry, wait_on_close):
    '''Upload a file from the output directory. Record a reference to the
    uploaded object in the entry.
    '''
    local_path = os.path.join(entry['local_dir_path'], entry['fname'])
    if entry['target_dir_path'] is None:
        trg_path = "/{}".format(entry['fname'])
        print("uploading file: {} -> {}".format(local_path, trg_path))
        sys.stdout.flush()
        f_obj = dxpy.upload_local_file(local_path, wait_on_close=wait_on_close)
    else:
        trg_path = "/{}/{}".format(entry['target_dir_path'], entry['fname'])
        print("uploading file: {} -> {}".format(local_path, trg_path))
        sys.stdout.flush()
        f_obj = dxpy.upload_local_file(local_path,
                                       folder=("/" + entry['target_dir_path']),
                                       parents=True, wait_on_close=wait_on_close)
    entry['dxlink'] = dxpy.dxlink(f_obj)

def sequential_file_upload(to_upload, wait_on_close):
    '''Sequentially upload everything that is in the output directory.
    '''
    for entry in to_upload:
        upload_one_file(entry, wait_on_close)

def parallel_file_upload(to_upload, wait_on_close):
    ''' same as sequential_file_upload, but in parallel '''
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_num_parallel_uploads) as executor:
        future_files = {executor.submit(upload_one_file, e, wait_on_close): e for e in to_upload}
        for future in concurrent.futures.as_completed(future_files):
            entry = future_files[future]
            try:
                future.result()
            except Exception as exc:
                print('file %s generated an exception %r' % (entry['fname'], entry['dxlink']))
                raise
            else:
                pass

def update_output_json(subdir_recs):
    ''' update the output json file.'''

    # Load existing file, if it exists
    output_json = {}
    output_file = file_load_utils.get_output_json_file()
    if os.path.exists(output_file):
        with open(output_file, 'r') as fh:
            output_json = json.load(fh)

    def get_new_file_output_value(key, class_, dxlinks):
        # Get the old value as a list, an empty list if the value
        # does not exist yet.
        orig_value_as_list = output_json.get(key, [])
        if not isinstance(orig_value_as_list, list):
            orig_value_as_list = [orig_value_as_list]
        if class_ is None:
            # Make it into an array if there are already values there
            final_list = orig_value_as_list + dxlinks
            if len(final_list) == 1:
                return final_list[0]
            else:
                return final_list
        elif class_ == 'array:file':
            return orig_value_as_list + dxlinks
        elif class_ == 'file' and len(dxlinks) == 1:
            return dxlinks[0]
        else:
            report_error_and_exit(
                "AppInternalError",
                "Type mismatch, key {} is of type {}, json={} num_dxlinks={}".
                format(key, class_, type(output_json[key]), len(dxlinks)))

    # Add all the entries
    for key in subdir_recs:
        subdir_desc = subdir_recs[key]
        dxlinks = subdir_desc['dx_links']
        if len(dxlinks) == 0:
            continue
        class_ = None
        if 'class' in subdir_desc:
            class_ = subdir_desc['class']
        output_json[key] = get_new_file_output_value(key, class_, dxlinks)

    # write it back out
    with open(output_file, 'w') as fh:
        json.dump(output_json, fh, indent=4)


## entry point
output_spec = get_output_spec()
subdir_recs = get_output_subdir_info()

# remove entries the user asked to exclude
if len(args.exclude) > 0:
    if output_spec is not None:
        output_spec = file_load_utils.filter_dict(output_spec, args.exclude)
    subdir_recs = file_load_utils.filter_dict(subdir_recs, args.exclude)

# Compare against the output spec
if output_spec is not None:
    compare_to_output_spec_and_annotate(subdir_recs, output_spec)

# Open the subdirectories hash into a list with an entry per file. This avoids
# sharing data between concurrent threads.
to_upload = []
for key in subdir_recs:
    subdir_desc = subdir_recs[key]
    for filename in subdir_desc['files']:
        to_upload.append(create_entry(subdir_desc, subdir_desc['path'], None, filename))

# Go through the deep subdirectories, make a listing of files to upload.
# We are relying on the platform to create all subdirectories, this
# means that we do not need to explicitly create them.
for key in subdir_recs:
    subdir_desc = subdir_recs[key]
    for idx, ddir in enumerate(subdir_desc['child_dirs']):
        dfiles = subdir_desc['child_dfiles'][idx]
        for dfile in dfiles:
            rel_path = os.path.dirname(dfile)
            local_path = os.path.join(subdir_desc['path'], rel_path)
            to_upload.append(create_entry(subdir_desc,
                                          local_path,
                                          rel_path,
                                          os.path.basename(dfile)))

# upload concurrently
if args.parallel:
    parallel_file_upload(to_upload, args.wait_on_close)
else:
    sequential_file_upload(to_upload, args.wait_on_close)

# Uploads are complete, we can collect all results. This avoids
# data races in the parallel upload case.
for entry in to_upload:
    entry['subdir_desc']['dx_links'].append(entry['dxlink'])

if args.clearJSON:
    file_load_utils.rm_output_json_file()
update_output_json(subdir_recs)
